top of page
1-modra.jpg
ClsoeIT Logo

Black Friday ready real-time analytics

E-commerce platforms like Shopify contain their reporting dashboards which cover tons of figures generated by the shop, but it might not be sufficient for all architectures. We looked at the case when a customer wants to achieve the following:


  • perform compound analytical tasks on multiple sources - multiple eshops, combination with physical store IT system,

  • or do more advanced basket analytics developed by your team in Spark,

  • or preprocess data further routed to your ML models,

  • or get an early Slack notification that a product is quickly becoming sold out,

  • and perform all that in real-time, even during high peak occasions like Black Friday.


To solve this challenge, we decided to use real-time stream processing by the Spark cluster within the Databricks platform and Amazon MSK. This way we can analyze the current state at every moment and react accordingly. Moreover, using AWS we can quickly scale up on demand and pay only for really utilized resources. So without sacrificing flexibility or processing performance in between peak sales periods the infrastructure cost is much lower.


As we already developed an app for Shopify in the past, the integration with AWS stack was just a formality. Register new testing stores and subscribe to webhooks was already a well-known path. Using AWS Serverless Application Model (SAM) we have one template project in git where we define our services like API Gateway routes connected to lambdas, new stores registered in DynamoDB, EventBridge rules for triggering lambdas, constants passed to services, providing static content, and saving backups to S3.


Target architecture diagram
Target architecture diagram

Webhooks

Subscribing to webhooks, in this case, product/update as we need to react to product stock change event by setting up a connection to Amazon EventBridge and calling Shopify API for GraphQL

const response = await fetch('https://${shopOrigin}/admin/api/2020-10/graphql.json', {
  method: 'POST',
  headers: {
    'Content-Type': 'application/json',
    "X-Shopify-Access-Token": accessToken,
  },
  body: JSON.stringify({query})
});

with request body:

const query = '
  mutation {
    eventBridgeWebhookSubscriptionCreate(
      topic: ${topic}
      webhookSubscription: {
        arn: ${EVENT_BUS_ARN}
        format: JSON
      }
    ) {
      webhookSubscription {
        id
      }
      userErrors {
        message
      }
    }
  }
';

Amazon MSK setup

Creating minimal MSK in AWS is straightforward, following “Quick create”, defining cluster name, Kafka version, broker sizing, and storage amount. After some time amazon creates the cluster and we get our connection strings.


As far as we know MSK does not have any GUI showing the Kafka topics and their content, so we decided to create a new EC2 instance as a Kafka client where we could test and debug the stream during development.


Created MSK is in a VPC and Security Group, where we had to define Inbound rules to allow connection to the EC2 with Kafka client. This had to be done also for lambdas which were connecting to MSK, in our case Kafka Producer and Kafka Consumer lambdas described further.


Kafka Producer

Next, we created a lambda where we used KafkaJS to connect to Amazon MSK and produce messages to our defined topic from the received webhook payload. We are sending messages in the format of the shop domain as a key and a JSON object as a value.


With this implemented, we could see the produced messages in EC2 where we connected to our topic using a script in the Kafka directory.

/bin//kafka-console-consumer.sh --bootstrap-server b-2.kafka-cluster-demo.....kafka.eu-central-1.amazonaws.com:9092 --topic {our_kafka_topic} --from-beginning

Databricks (Spark)

To use Databricks, we had to set up our connection to MSK. As Databricks and MSK are in different VPCs we had to create a peering connection between them and allow routes to one another. Similar to this guide from Databricks: How to set up Apache Kafka on Databricks.


Finally, our Databricks cluster could connect to our Amazon MSK and we could process the events from our topic. In Databricks notebook, we can do any kind of analytics together with visualizations, data cleaning, and writing the desired result back to a new MSK topic. In our case, we wanted to get the current amount of each product in stock where the available quantity changed. So our analysis looked similar to this:

dfStream.select(
  "key", "timestamp", "payload", from_json(col("variants"), variantSchema).alias("variant")
)\
.na.drop("all", subset=["variant.id"])\
.groupBy(col("payload.id").alias("id"), "timestamp")\
.agg(
  first("payload.title").alias("title"),
  min(col("variant.price").cast("float")).alias("minPrice"), 
  max(col("variant.price").cast("float")).alias("maxPrice"),
  sum("variant.inventory_quantity").alias("quantity")
)\
.orderBy("timestamp")

The only thing left that had to be done was filter by a criteria, likelihood of getting out-of-stock soon. It can be a constant or some function based on previous values (gradient). We wrote the analyzed result back to MSK for further process in Kafka Consumer lambda.


Now, as we analyzed the stream in the notebook, we created a Job that runs the notebook and set the restart when the current Job fails.


Kafka consumer

As this is specific for the customer and the vendor of the specific product, there can be different actions how Kafka consumer lambda can order new products and refill stock. We can call directly the target API, send emails with order requests and create pending transfers in Shopify, create a Jira issue or write a Slack message where someone will take care of the order further.


Conclusion

Overall, our solution demonstrates how Databricks and Kafka can be a valuable tool to react to different events in real time or perform advanced analytics beyond standard reporting features.

Related Posts

See All

MGS to Databricks connector

Analytics teams in banks and insurance companies use platforms like Databricks for data science, ML/AI model development, model testing,...

Comments


bottom of page