How to Build an Analytics Data Pipeline in Python
Introduction:
A data pipeline is a sequence of steps in data preprocessing. Data pipelines allow you to use a series of steps to convert data from one representation to another. A key part of data engineering is data pipelines. A common use case for a data pipeline is to find details about your website’s visitors. Like in Google Analytics, you know the importance of visitors seeing real-time and historical data. Then there are a series of steps in which each step delivers an output that is an input to the next step. This continues until it is full of the pipeline. In certain instances, it is possible to run separate steps in parallel. Data pipelines carry raw data to data warehouses for use by analytics business intelligence (BI) tools from software-as-a-service (SaaS) service systems and database sources. By writing code and manually interfacing with source databases, developers can create pipelines themselves.
Data Pipeline Architecture:
Data pipeline architecture is the design and structure of code and systems that copy, clean or transform as required and route source data to destination systems, such as data warehouses and lakes. The speed at which data passes through a data pipeline relates to three factors:
- Rate or throughput is how much information a pipeline can process in a given period.
- Data pipeline reliability allows the individual system to be fault-tolerant within a data pipeline. A secure data pipeline with built-in processes for auditing, logging and validation helps guarantee data quality.
- Latency is the time to pass through the pipeline for a single unit of data. Latency is more about reaction time than it is about volume or throughput.
Start Your 30-Day FREE TRIAL with Data Science Academy to Launch Your Career in Data Analysis. Connect with our experts to learn more about our data analytics courses.
Designing Data Pipeline:
A data pipeline architecture is layered.
- DATA SOURCES:
Thousands of possible data sources are provided by SaaS, and every company hosts hundreds of others on their systems. Data sources are essential to its architecture as the first layer in a data pipeline. There is nothing to ingest and pass through the pipeline without quality data.
- Ingestion
The processes that read data from data sources are the intake components of a data pipeline. Using an application programming interface (API) provided by the data source, an extraction method reads from each data source. However, you have to find out what information you want to collect through a process called data profiling.
- Transformation
When data is extracted from source systems, it might need to change its structure or format. It involves mapping coded values to more descriptive ones, filtering and aggregation. The combination is a particularly important type of transformation. It involves database joins, where it is possible to exploit relationships encoded in relational data models.
The timing of any transformations depends on what method of data replication an organization prefers to use in its data pipeline: ETL (extract, transform, load) or ELT (extract, load, transform). ETL is an older technology and will transform data until it is loaded to its destination. ELT, used in modern cloud-based data centers, loads data without any transformation being implemented. Inside a data warehouse, a data customer may then implement their data transformations.
- Destination
The water towers and holding tanks of the data pipeline are the destinations. The principal destination for data replicated through the pipeline is a data warehouse. These specialized databases include all cleaned, mastered data from an enterprise in a centralized location for analysts and executives to use in analytics, reporting and business intelligence.
Less-structured data can flow into data lakes, where vast volumes of rich and minable data can be accessed by data analysts and data scientists. Finally, an organization may feed information into an analytics tool or service that accepts data feeds directly.
- Monitoring
Data pipelines are complex networks consisting of components of the software, hardware and networking; all of which are exposed to failures. Developers must write tracking, logging and alerting code to help data engineers control output and fix any issues that occur to keep the pipeline operational and capable of extracting and loading data.
Example of Data Pipeline:
Here’s a simple example of a data pipeline that calculates how many visitors have visited the site each day:
Input:
Raw logs
Every day, getting from raw logs to guest counts. We go from raw log data to a dashboard where we can see visitor counts every day, as you can see above. Notice that this pipeline runs continuously; it grabs and processes them as new entries are added to the server log. Hopefully, there are a few things you have found about how we have organized the pipeline:
- Each part of the pipeline is isolated from the others and takes a given input into it and returns a defined output.
- Raw log data is stored in a database. This means that we have access to all of the raw data if we ever want to run a different analysis.
- We're deleting duplicate documents. In the analysis method, it is very simple to add duplicate data, so it is important to deduplicate before moving data through the pipeline.
- Each part of the pipeline feeds data into another element. We want to keep each component as small as possible so that we can scale up pipeline components individually or use the outputs for a different analysis type.
COUNTING VISITORS WITH DATA PIPELINES IN PYTHON:
We need a way, in each case, to get data from the current step to the next step. If we point out our next step in the database, which is counting IPs by day, it will be able to take out events as they are added based on time by querying. Although by using a queue to transfer data to the next stage, we can gain more performance, performance at the moment is not vital.
We're going to build another file, count visitors.py, and add some code that pulls information out of the database and counts by day.
First, we want to query the database's data. We: In the code below, we:
Connect A database relation.
The query for any rows added after a certain timestamp.
Fetch all the lines.
def get_lines(time_obj):
conn = sqlite3.connect(DB_NAME)
cur = conn.cursor()
cur.execute("SELECT remote_addr,time_local FROM logs WHERE created > ?",
[time_obj])
resp = cur.fetchall()
return resp
Then we need a way to extract the time and IP from each row we have requested. The code below is:
- Initialize two empty lists.
- Pull the time and ip out of the answer to the question and add them to the list.
def
get_time_and_ip(lines):
ips = []
times = []
for line
in lines:
ips.append(line[0])
times.append(parse_time(line[1]))
return ips, times
You may note that we parse the time in the above code from a string into a date, time object.
Below is the code for the parsing:
def parse_time(time_str):
time_obj = datetime.strptime(time_str, '[%d/%b/%Y:%H:%M:%S
%z]')
return time_obj
lines =
get_lines(start_time)
ips, times
= get_time_and_ip(lines)
if len(times) > 0:
start_time
= times[-1]
for ip,
time_obj in zip(ips, times):
day =
time_obj.strftime("%d-%m-%Y")
if day not in unique_ips:
unique_ips[day]
= set()
This code means that there is a key for each day for unique IPs, and the values are sets containing all the unique IPs that reach the web that day.
We can then take the code snippets from above so that they run every 5 seconds:
unique_ips = {}
counts = {}
start_time =
datetime(year=2017, month=3, day=9)
while True:
lines = get_lines(start_time)
ips, times = get_time_and_ip(lines)
if len(times) > 0:
start_time = times[-1]
for ip, time_obj in zip(ips, times):
day = time_obj.strftime("%d-%m-%Y")
if day not in unique_ips:
unique_ips[day] = set()
unique_ips[day].add(ip)
for k, v in unique_ips.items():
counts[k] = len(v)
count_list = counts.items()
count_list = sorted(count_list, key=lambda x: x[0])
for item in count_list:
print("{}: {}".format(*item))
time.sleep(5)
We've now taken a tour to produce our logs via a script, as well as two pipeline steps to analyze the logs. To get the entire pipeline running:
Clone the analytics pipeline repo from GitHub if
- you haven't already.
- To get it installed, follow the README.md file.
- Run log generator.py.
- Run store logs.py.
- Launch count visitors.py.
After running count visitors.py, every five seconds you should see the visitor counts printed out for the current day. If you leave several days of scripts going, you will start seeing multiple days of visitor counts. Congratulations! You set up and run a pipeline of data.
Connect with our experts to learn more. Start your 30-day free trial today.