The importance of building your pipeline toolbox from small independent segments of platform agnostic code

Catherine Azam
4 min readAug 15, 2021
the output after parsing as it would look from a Jupyter notebook

An important task in Data Engineering is writing efficient scalable pipelines that move data from A to B. There are many GUI-tools out there and they are great because there is a real shortage of Data Engineers at the moment.

Being Data Engineers however we should not be limited by what any tool can do for us, locked in by any vendor or having to spent time figuring out how a brandnew hip GUI works every few months.

Therefore it is preferable to build a toolbox with migration-scripts for the most common use-cases and this series of articles is intended to help you build this toolbox by providing articles with common examples that will save you many hours of reading documentation.

My tool of choice is Python’s Pandas as usual. Today I will demonstrate how to query the NewRelic API and send data into a MongoDB.

You will will need to Pip install pymongo like this:

pip install pymongo

This is the helper function I wrote to get a dataframe into Mongo:

#To connect
import pandas as pd
import pymongo
from pymongo import MongoClient
def write_df_to_mongoDB( my_df,\
database_name = ‘MY_DATABASE’ ,\
collection_name = ‘MY_COLLECTION’,\
server = ‘0.0.0.0’,\
mongodb_port = 27017,\
chunk_size = 100):
client = MongoClient(‘localhost’,int(mongodb_port))
db = client[database_name]
collection = db[collection_name]
# To write
my_df.reset_index(inplace=True)
my_list = my_df.to_dict(‘records’)


collection.insert_many(my_list)
return

Getting some data:

Now as an example, say, you’d like to analyze crash rates for your device that NewRelic makes available to you. You would write a scraper like this:

  1. Define the date range over which to extract data:
def get_date_range(self):
'''This is an example using 30 days worth of data. Adjust this to your volume.'''

today = date.today()
dates = []
time_delta = 30
today_q = datetime.strftime(today, '%Y-%m-%d')
date_range = today - timedelta(days=int(time_delta))
start_date= datetime.strftime(date_range, '%Y-%m-%d')
dates.append(start_date + " 00:00:00")
dates.append(today_q + " 23:59:00")
return dates

2. Parse the Json that gets return via pandas to give it structure. (this is of course much more important when inserting into a relational database, but regardless, it is best practice to do this at this stage of the pipeline, as you may have data in json from various sources and have to standardize them for your apps to use them:

def _get_df(self):
asdict1 = self._make_request()
df1 = pd.DataFrame(asdict1["timeSeries"])
df1["results"] = df1["results"].str.get(0)
df1["results"] = df1["results"].str.get('result').astype(float)
df1["crash_rate_percent"] = 100 - df1["results"]
df1["start"] = df1["beginTimeSeconds"].apply(datetime.utcfromtimestamp)
df1["end"] = df1["endTimeSeconds"].apply(datetime.utcfromtimestamp)
df1["player"] = "my_device"

3. Call your helper function to insert the data into the storage of your choice. We split all this into seperate modules because next time you may well find tht the client wants the data in a different sort of storage. You see how this keeps you fast, agile and platform/ GUI tool independent?

write_df_to_mongoDB(df1)
print("*****************************************************************************************")
print("way to go, sent to local MongoDB")

Since we are in full control of this data pipeline, we can now do with it whatever we wish. This could be automatically generating Charts that show the data and sending them to Slack, a front-end client, a management reporting tool, anything you can think of really.

Have a look at this example which produces a time-series chart from the data we just extracted:

import matplotlib.pyplot as plt
plt.figure(figsize = (15,10))
plt.rcParams["figure.figsize"] = (15, 10)
import seaborn as sns
sns.set_theme(style="darkgrid")
# Plot the responses for different events and regions
sns.lineplot(x="start", y="crash_rate_percent",
data=df1)
plt.title("Crash rates for APPLE TV", fontsize = 20)

For our sample device the chart looks like this:

demo of how to visualize data in python to integrate into our automated pipeline

This is the complete module we just wrote:

import time
from random import randint
import pandas as pd
import requests
from datetime import date, timedelta, datetime
from data_helpers import write_df_to_mongoDB
import json

class getData:
'''This Class lets you scrape a New Relic data
which can be used for AI and Machine Learning Clssification and Prediction Tasks'''


def __init__(self, url = ""https://insights-api.newrelic.com/blabla" ):
'''API key needs to be set in the ENV'''

self.url = url
self.key = <API_KEY_FROM_ENV>


def get_date_range(self):
'''THis is an example using 30 days worth of data. Adjust this to your volume.'''

today = date.today()
dates = []
time_delta = 30
today_q = datetime.strftime(today, '%Y-%m-%d')
date_range = today - timedelta(days=int(time_delta))
start_date= datetime.strftime(date_range, '%Y-%m-%d')
dates.append(start_date + " 00:00:00")
dates.append(today_q + " 23:59:00")
return dates


def _make_request(self):

dates = self.get_date_range()
headers = {"Accept": "application/json", "X-Query-Key": self.key}
auth = HTTPBasicAuth('X-Query-Key', self.key)
data = {}
r1 = requests.get(self.url + dates + url2 + today_q + url3, headers=headers, data = data, auth = ('X-Query-Key', self.key))
# print(r.status_code) #if you run into problems check this first :-)
asjson = r.text
time.sleep(randint(1,5))
return asjson

def _get_df(self):
asdict1 = self._make_request()
df1 = pd.DataFrame(asdict1["timeSeries"])
df1["results"] = df1["results"].str.get(0)
df1["results"] = df1["results"].str.get('result').astype(float)
df1["crash_rate_percent"] = 100 - df1["results"]
df1["start"] = df1["beginTimeSeconds"].apply(datetime.utcfromtimestamp)
df1["end"] = df1["endTimeSeconds"].apply(datetime.utcfromtimestamp)
df1["player"] = "my_device"
write_df_to_mongoDB(df1)
print("*****************************************************************************************")
print("way to go, sent to local MongoDB")

def run(self):
print("I will now fetch the data")
self._get_df()
return "data inserted sucessfully into collection."
if __name__ == "__main__":
getData().run()

And it’s done. If you liked this article, please hit the corresponding button as I am new to writing Medium articles and would love to know what is useful to other Data Professionals. Also, feel free to message me on LInkedin if you would like to see an article about a specific data operation. If it is Python based I would be more than happy to help.

--

--

Catherine Azam

Google Cloud Certifed Data Engineer Professional, AI Architect and Data Plumber.