Skip to main content
留学咨询

程序代写案例-ECS765P

By January 8, 2021No Comments

SOLUTIONS AND MARKING SCHEME January 2019-2020 ECS765P Big Data Processing Duration: 2 hours YOU ARE NOT PERMITTED TO READ THE CONTENTS OF THIS QUESTION PAPER UN- TIL INSTRUCTED TO DO SO BY AN INVIGILATOR. Instructions: This paper contains FOUR questions. Answer ALL questions Calculators are permitted in this examination. Please state on your answer book the name and type of machine used. COMPLETE ALL ROUGH WORKINGS IN THE ANSWER BOOK AND CROSS THROUGH ANY WORK WHICH IS NOT TO BE ASSESSED. IMPORTANT NOTE: THE ACADEMIC REGULATIONS STATE THAT POSSESSION OF UNAUTHORISED MA- TERIAL AT ANY TIME WHEN A STUDENT IS UNDER EXAMINATION CONDITIONS IS AN ASSESSMENT OFFENCE AND CAN LEAD TO EXPULSION FROM QMUL. PLEASE CHECK NOW TO ENSURE YOU DO NOT HAVE ANY NOTES, MOBILE PHONES OR UNATHORISED ELECTRONIC DEVICES ON YOUR PERSON. IF YOU HAVE ANY THEN PLEASE RAISE YOUR HAND AND GIVE THEM TO AN INVIGILATOR IMMEDIATELY. PLEASE BE AWARE THAT IF YOU ARE FOUND TO HAVE HIDDEN UNAUTHORISED MA- TERIAL ELSEWHERE, INCLUDING TOILETS AND CLOAKROOMS IT WILL BE TREATED AS BEING FOUND IN YOUR POSSESSION. UNAUTHORISED MATERIAL FOUND ON YOUR MOBILE PHONE OR OTHER ELECTRONIC DEVICE WILL BE CONSIDERED THE SAME AS BEING IN POSSESSION OF PAPER NOTES. MOBILE PHONES CAUSING A DISRUPTION IS ALSO AN ASSESSMENT OFFENCE. EXAM PAPERS MUST NOT BE REMOVED FROM THE EXAM ROOM. Examiners: Dr Felix Cuadrado and Dr Johan Pauwels © Queen Mary University of London, 2019-2020 Page 2 ECS765P (2019-2020) Question 1 You have a dataset from a non-profit organisation listing the full history of member contri- butions for emergency appeals when e.g. a disaster happens. For the next emergency appeals campaign, the organization is aiming to send postal envelopes to the 500 highest contributors up to this point. The dataset is a collection of rows, each one registering an individual contribution from a member in the following format: timestamp;memberId;isMultipleDonor;donationAmount;memberAge The isMultipleDonor field is a boolean field that will be True if the member has made more than one donation. donationAmount contains the value of that individual donation, whereas memberAge includes for how many months they have been a member. (a) (i) Design a combination of *two* MapReduce programs that computes the top 500 members with highest individual average contributions to these urgent appeal campaigns. You should only consider members with more than one individual contribution. The code flow must be explained, discussing the input and output of each Map and Reduce function that has been defined (without detailing the implementation). You may use a diagram to illustrate the overall data flow. Note: while a single MapReduce solution is technically possible, it would result in potentially some scalability/performance issues. Single MapReduce solutions should mention these aspects specifically. (ii) Write the pseudocode for one of the two MapReduce jobs you designed in the previous question. State in your solutions any assumptions that are made as part of the program, as well as the behaviour of any custom function you deem necessary. [15 marks — basic] Solution: The first program will compute the average contribution of each member from the dataset that is a multiple times donor 3 . That input will be fed to the second program that will compute the global average (numerical summarisation) of all these results 3 . First mapper will emit (memberId, donationAmount) 1 , First reducer will emit (memberId, averageAmount) 1 Second mapper will emit (None, (memberId,averageAmount)), 1 Second reducer will emit 500 x (memberId, ranking, averageAmount) 1 Either of these two MR programs is sufficient for the 5 marks Turn over ECS765P (2019-2020) Page 3 def mapper1(self, _, line):\mk{1} fields = line.split(“;”) multipleDonor = fields[2] if(multipleDonor):\mk{1} memberId = fields[1] amount = fields[3] yield (memberId, amount)\mk{1} def reducer1(self, memberId, values):\mk{1} average = computeAverage(values) yield(memberId, average) \mk{1} def mapper2(self, memberId, average): \mk{1} yield(None, (memberId, average)) \mk{1} def reducer2(self, _, values): \mk{1} sorted = values.sortByValue() top500 = sorted.getTop(500)\mk{1} for item in top500: yield(item[0], item[1]) \mk{1} (b) This question is about the Combiner in the context of MapReduce jobs. (i) Explain who runs the Combiner function and at what point in time Combiner functions execute during the shuffle and sort stage of MapReduce jobs. (ii) Discuss whether the *second* job from 1a) would benefit from using a Combiner. To do so, first list what will change when a Combiner runs, and then explain its performance impact. If you completed 1a) with a single MapReduce job, then answer the question referring to that job. [10 marks — medium] Solution: A Combiner is an intermediate Reducer that is executed at each of the Mapper nodes 2 . The Combiner runs after the Mappers have finished their execution, and before data is transferred to the Reducers 2 . Combiner runs according to Hadoop from 0 to multiple times, as it’s optional and idempotent 1 . The second job can run the same Reducer as a Combiner 2 . This will prevent memory issues when running the job 1 , and make it complete considerably faster, as a single Reducer invocation would need to sort every single item that Turn over Page 4 ECS765P (2019-2020) needs to be ranked. 2 Turn over ECS765P (2019-2020) Page 5 Question 2 (a) You have a dataset of user ’reactions’ from the Instagram social media network. The dataset contains one entry for each time a user liked a specific post, recording the following information: timestamp;userId;storyId;reactionType; reactionType records the type of emoji users reacted to the message, with potential types being ”like”, ”dislike”, or ”love” This dataset is given as input to the following MapReduce job: def mapper(self, _, line): fields = line.split(“;”) id = fields[1] reactionType = fields[3] yield (id, (reactionType,1)) def reducer(self, id, reactions): reactionCounts = sumReactionsByType() #sumReactionsByType() sums the partial totals # for each Type, obtaining a dictionary of # types and counts for that reaction top = getMax(reactionCounts) yield ( id, top.getType()) What will be the outcome of the MapReduce job? Explain the outcome as well as the nature of information exchanged in the job execution. [5 marks — basic] Solution: The outcome will be the most common type of reaction for each post that has been posted in the dataset 3 . The mapper sends for each reaction the post id as a key, and a pair with the type of reaction and one as a pair 1 . The reducer receives all the reactions for a post, aggregates them together by type, and yields the most common one 1 . (b) Suppose the program presented in 2a) will be executed on a dataset of 5 billion (5,000,000,000) reactions, collected over 3*365 days of data. In total there are 50,000,000 stories that have been reacted to. The total size of the dataset is 4*128 Gigabytes. The cluster has 5000 worker nodes (all of them idle), and HDFS is configured with a block size of 128MB. Using that information, provide a reasoned answer to the following questions. State Turn over Page 6 ECS765P (2019-2020) any assumptions you feel necessary when presenting your answer. Note: you can consider in your computations either 1kB = 1000 bytes, or 1 kB = 1024 bytes. (i) How many physical nodes will be involved during the execution of the Map and Reduce tasks of the job? How many input splits are processed at each physical node? (ii) In each single worker node, how many times does the Map method run? (iii) How many times will the Reduce method be invoked at each reducer node? [10 marks — medium] Solution: There is one data split per HDFS block: 4*128GB/128MB = 4000 splits that need to be processed. The ApplicationMaster will attempt to assign one to a node, taking 4000 nodes for Map (ideally local nodes with the data already stored) 2 . As there is enough capacity, one split will be processed by each involved physical node 2 . The number of reduce nodes is user defined, by default 2 1 If we assume bal
anced size of all splits in number of lines, there will be 5 billion lines/4000 mappers = 1.25 million lines per Mapper 3 . Students can either use nreducers, or give it a value. The number of unique keys in this case is the 50 million stories. Number of reduce invocations is 50 million/nred (25m with the default 2) 2 . (c) Define the concept of data skew in distributed data processing. For the program defined in 2a) and the data and environment figures presented in 2b), discuss whether data skew would cause a noticeable effect while running the provided code. If that will be the case, mention (no need to fully detail) one potential change to the job that would greatly improve the skew effect. [10 marks — advanced] Solution: Data skew affects MapReduce jobs by having an unbalanced distri- bution of values between the Reduce keys, hence unbalancing the distributed load. 3 In this particular case, the dataset being of social interactions, data skew will be a notable effect, with some posts being vastly more popular than others. This will result in potentially heavy data skew. 3 A simple way to palliate this skew would be to add a Combiner to the mappers that aggregates together the counts of reactiontypes for the same post id. 2 As long as posts are emitted by the same mapper, this approach will be effective. 2 Turn over ECS765P (2019-2020) Page 7 Question 3 (a) (i) Give one example of a Spark element-wise operation. Describe how would the Reduce step of an equivalent Hadoop job look like. (ii) Based on your previous answer, discuss the parallelisation potential of this element-wise operation as a function of the size of the input data size. De- tail in your explanation what nodes will be active during the computation, as well as the data transferred over the network. [9 marks] Solution: i) Example: map, flatMap, filter. The Reduce step is not needed, i.e element-wise operations are Map-only jobs 3 . ii) An element-wise or map-only operation is embarrassingly parallel, which means that scaling up to a larger cluster is trivial 2 . All nodes that contain a partition of the data will be used in the computation 2 . Since no shuffling of the data is needed, no transfers over the network are needed 2 . Marking scheme: i) Three marks: basic i)) Six marks: basic-medium (b) You are given a dataset with the historical results of the top-tier English Football League per season. It is in a CSV (comma separated value) format and consists of the following fields: Season ,Team, Wins , Draws , Losses , GoalsFor , GoalsAgainst An example of this data is as follows: 1888−1889, Preston North End,18 ,4 ,0 ,74 ,15 1888−1889,Aston V i l l a ,12 ,5 ,5 ,61 ,41 1888−1889,Wolverhampton Wanderers ,12 ,4 ,6 ,51 ,37 1888−1889, Blackburn Rovers ,10 ,6 ,6 ,66 ,45 1888−1889, Bol ton Wanderers ,10 ,2 ,10 ,63 ,59 Suppose you execute the following Python Spark job on the dataset: 1l i n e s = sc . t e x t F i l e ( ’ hdfs : / / inputPath . csv ’ ) 2f i l t e r e d 1 = l i n e s . f i l t e r ( lambda l : len ( l . s p l i t ( ” , ” ) ) == 7) 3rows = f i l t e r e d 1 .map( lambda l i n e : l i n e . s p l i t ( ” , ” ) ) 4f i l t e r e d 2 = rows . f i l t e r ( lambda r : r [ 2 ] == r [ 4 ] ) 5mapped = f i l t e r e d 2 .map( lambda r : ( r [ 1 ] , 1 ) ) 6r e s u l t = mapped . reduceByKey ( lambda a , b : a+b ) 7r e s u l t . saveAsTextFi le ( ’ hdfs : / / outputPath ’ ) We provide for reference the following information about the Python and Spark-specific functions appearing in the program: Turn over Page 8 ECS765P (2019-2020) filter is a Spark transformation that for each input element will either generate the same element as output if the condition expressed in the function is true, or no element at all if it is false. map is a Spark transformation that for each input element generates one output element, whose value is dictated by the function. reduceByKey is a Spark transformation that takes as input an RDD of pairs of elements, first groups all the pairs, putting together the ones with the same key, and generates one pair as a result of the group. The key of the result is the common key, and the value is the result of reducing the group of values into a single one by applying the function. min is a Spark action that returns to the driver the minimum item as specified by the optional key function. saveAsTextFile is a Spark action that saves the RDD to the provided HDFS path. len is the Python function that returns the length of a string, or list. split is the Python function that splits a string into a list of strings, using the provided argument as separator. int is the Python function that converts the string representation of a number into its numerical value. (i) Explain what the final result of the code will be. Discuss what each line in the code does to end up at this result. It may be easier to reference each of these lines by their number or variable names. (ii) Detail the changes you need to make to the above code in order to find what season had the least number of draws in history. The output needs to be a string of the form “The least number of draws (N) happened in season YYYY-YYYY”. You may explain these changes in your own words, or provide the code in pseudocode or Python. [16 marks] Solution: i) The code returns the number of seasons each team had as many wins as losses 2 . Line 1 creates an RDD by reading a text file from HDFS 1 . Line 2 filters the data for lines which do not have the correct format/have missing fields 1 . Line 3 splits CSV lines into separate values 1 . Line 4 keeps only the seasons when a team had as many wins as losses 1 . Line 5 creates key-value pairs of the team name and a value of 1 of the output of the previous line 1 . Line 6 sums the values of 1 per team name to count the number of times each time has had as many wins as losses in a season 1 . Line 7 is an action that writes the result to HDFS 1 . ii) Something along the lines of: Turn over ECS765P (2019-2020) Page 9 1l i n e s = sc . t e x t F i l e ( ’ hdfs : / / inputPath . csv ’ ) 2wel l formed = l i n e s . f i l t e r ( lambda l : len ( l . s p l i t ( ” , ” ) ) == 7) 3rows = wel l formed .map( lambda l : l . s p l i t ( ” , ” ) ) 4season draws = rows .map( lambda r : ( r [ 0 ] , i n t ( r [ 3 ] ) ) ) 5draws per season = season draws . reduceByKey ( lambda x , y : x+y ) 6r e s u l t = draws per season . min ( key=lambda x : x [ 1 ] ) 7answer = ’ The l e a s t number o f draws ({} ) happened i n 8season {} ’ . format ( r e s u l t [ 1 ] , r e s u l t [ 0 ] ) Marking scheme: i) Nine marks: basic-medium. ii) Seven marks: medium- advanced. Line 4 2 , line 5 2 , line 6 2 , line 7–8 1 . Similar code will be given equivalent marks as long as they implement analogous functions. Turn over Page 10 ECS765P (2019-2020) Question 4 (a) This question is about different types of stream processing. (i) Components of a streaming algorithm are either stateless or stateful. Explain what these terms mean in this context and give an example computation for each. (ii) Micro-batching and pure stream processing both run on unbounded data. Explain how these two processing models differ. Discuss in what situation each would be more appropriate, by making reference to latency and throughput considerations. [12 marks — medium] Solution: i) A stateless computation is one where the function will be run on each tuple 1 without the influence or ’state’ maintained from any other 1 , an example of this would be a filter 1 . Stateful operations may maintain a state 1 based on tuples previously seen 1 , for example an aggregate count for keys 1 . ii) Analysis ran in a pure stream processing fashion will operate on every tuple as they arrive 1 , leading to low latency of results 1 . Micro-batching on the other-hand may wait x number of seconds before triggering 1 the operation and executing on all ingested tuples, often leading to higher throughput, but obviously slower return of the result 1 . Pure streaming may be more appropriate when there is some real-time requirement or alerts 1 , whilst micro-batching may be more appropriate for windowing
1 . (b) This question is about the scalability of streaming systems. (i) Streaming systems often consist of data sources and computational units; Spouts and Bolts respectively in Apache Storm. Describe two things that may happen to these components when a system is scaled up from a single node to a cluster of machines for higher throughput. [6 marks — basic] Solution: i) The first thing that may happen to the bolts and spouts is that they are distributed among the nodes within the cluster instead of being on the same machine 3 . The second thing to occur would be replication of the bolts to allow their tasks to be parallelised 3 . (c) Graphs contain a set of vertices (entities within the data) alongside a set of edges (relationships between entities). Parallel Breadth First Search is a popular algorithm for resolving the shortest path between two entities within a graph. Explain how the Pregel model (think like a vertex) can be used to implement graph algorithms like Parallel Breadth First Search in a distributed environment such as Turn over ECS765P (2019-2020) Page 11 Spark GraphX. Give one example use case of finding the shortest path between entities. [7 marks — advanced] Solution: i) In the Pregel model computation is carried out in supersteps 1 . Each vertex gets a function to run every superstep, within this they may ingest messages, calculate some internal state, send message’s to neighbours and vote to halt 2 . Each vertex may execute in parallel and may, therefore, be distributed across any number of machines 2 . An example algorithm for shortest path is routing traffic between two nodes in a computer network 2 . End of questions 欢迎咨询51作业君

admin

Author admin

More posts by admin