Automate Everything - Automate NSQ Statistic reporting with Haskell

Several months ago, I was given a task to make a report which consist statistic of several NSQ topics. This job is not really hard, we just need to check NSQ statistic every hour and write it down into shared excel file. After several times checking manually, I thought that I can automate it easily. So, I made a script to automate most of if using Go. You can find my code right here. The code is just work, I haven't check it again after I made it, so forgive me if it looks bad.

Well, the title is about haskell, so let's get into it. After I made that golang project, I thought that maybe I can try to continue learning about Haskell by making several small project like nsq-message-checker-go. Here we go, this is how I made the Haskell version of nsq-message-checker-go. You can find the the completed code in gitlab.

main :: IO ()
main = echo "Let's code some Haskell"

What we need

  1. Stack. I don't know any other way to code haskell without stack, so bear with it.
  2. Intero. This is the best interactive development for haskell, you can complaint to the one that influence me if you have different opinion.
  3. Text Editor which can use Intero painlessly. I use VS Code with Haskero plugin, but you can also use Vim or Emacs. Emacs works out of the box but if you can integrate Vim/NeoVim with Intero, I will really appreciate if you tell me how to do that.

Project Setup

Let's create a new project using stack.

stack new nsq-message-checker
# output
Downloading template "new-template" to create project "nsq-message-checker" in nsq-message-checker/ ...
Looking for .cabal or package.yaml files to use to init the project.
Using cabal packages:
- nsq-message-checker/

Selecting the best among 13 snapshots...

Downloaded lts-11.13 build plan.
* Matches lts-11.13

Selected resolver: lts-11.13
Initializing configuration using resolver: lts-11.13
Total number of user packages considered: 1
Writing configuration to file: nsq-message-checker/stack.yaml
All done.

After the project successfully created, we should try to build the template project.

# Do not forget to run `stack setup` first
stack setup
stack build
# output
Registering library for nsq-message-checker-

If you see that kind of line in the last output, it means that the build is succeed.

Intero Setup

We should try whether intero can work perfectly. Intero will automatically downloaded and installed if we use emacs with haskell layer, but because we are using VS Code, we should build it first using stack.

stack build intero
# output
haskeline- using precompiled package
intero-0.1.31: download
intero-0.1.31: configure
intero-0.1.31: build
intero-0.1.31: copy/register
Completed 2 action(s).

After we reload the VS Code, haskero will automatically initialized and run intero in background.

Misc. Files

I also add/update several files manually before I start coding. You can see these files on the repository. These files are:

  1. .editorconfig
  2. .gitignore
  3. .env.example
  4. Makefile

I use .env file for the configuration. That's why I also use dotenv library to parse it.

Command Line Argument

My idea is to read the topic[s] from command line. So, to make a report for X topic we can use this kind of command:

stack exec nsq-message-checker -- topic X

And we can also make report for all topic at once like this:

stack exec nsq-message-checker -- topic all

To use that kind of style, we will use turtle library and put the functions in Main.hs:

-- Main.hs

{-# LANGUAGE OverloadedStrings #-}

module Main where

import           NSQ
import           Protolude
import           Turtle

-- dotenv
import qualified Configuration.Dotenv       as Dotenv
import           Configuration.Dotenv.Types

-- main parser
parser :: Parser (IO ())
parser = parseMainCommand
     <|> parseProcessNsqTopic

-- main command
parseMainCommand :: Parser (IO ())
parseMainCommand = pure mainCommand

mainCommand :: IO ()
mainCommand = echo "Use \"-h\" argument for help"

-- topic subcommand
-- stack exec nsq-message-checker -- topic all
-- stack exec nsq-message-checker -- topic optimization_rule
parseProcessNsqTopic :: Parser (IO ())
parseProcessNsqTopic =
       "Process specified topic"
       (argText "topic_name" "What topic should be processed? (all for all topics)"))

-- main
main :: IO ()
main = do
  _ <- Dotenv.loadFile defaultConfig
  cmd <- options "NSQ Statistic Checker" parser

As you can see, the main function is processNsq which accepts topic as topic name.

Code Structure

Honestly, I have not found any best practice example to manage this code, but my mentor said that I can just put the code in src folder directly as NSQ.hs. I also separate type related code in Types.hs.

├── src
│   ├── NSQ.hs
│   └── Types.hs


After analyzing the response from NSQ statistic and the output that I expected, I made 5 data type and put it in Types.hs. For the record, the output will be written into csv file, that's why I use cassava library in this project.

{-# LANGUAGE DeriveGeneric   #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TemplateHaskell #-}

module Types where

import           Data.Aeson
import           Data.Aeson.Casing (aesonPrefix, snakeCase)
import           Data.Text         as T
import           Protolude

-- cassava
import           Data.Csv          (FromRecord, ToNamedRecord, ToRecord)
import qualified Data.Csv          as Cassava

-- | Client data type.
data Client = Client
  { clientClientId                      :: Text -- ^ Client ID.
  , clientHostname                      :: Text -- ^ Hostname of the client.
  , clientVersion                       :: Text -- ^ Client version.
  , clientRemoteAddress                 :: Text -- ^ Remote address of client.
  , clientState                         :: Int -- ^ State of client.
  , clientReadyCount                    :: Int -- ^ Amount of ready messages.
  , clientInFlightCount                 :: Int -- ^ Amount of flight messages.
  , clientMessageCount                  :: Int -- ^ Amount of messages.
  , clientFinishCount                   :: Int -- ^ Amount of finished messages.
  , clientRequeueCount                  :: Int -- ^ Amount of requeued messages.
  , clientConnectTs                     :: Int
  , clientSampleRate                    :: Int
  , clientDeflate                       :: Bool
  , clientSnappy                        :: Bool
  , clientUserAgent                     :: Text
  , clientTls                           :: Bool
  , clientTlsCipherSuite                :: Text
  , clientTlsVersion                    :: Text
  , clientTlsNegotiatedProtocol         :: Text
  , clientTlsNegotiatedProtocolIsMutual :: Bool
  } deriving (Show, Generic)

instance ToJSON Client where
  toJSON = genericToJSON $ aesonPrefix snakeCase

instance FromJSON Client where
  parseJSON = genericParseJSON $ aesonPrefix snakeCase

-- | Data type for channel.
data Channel = Channel
  { channelChannelName   :: Text -- ^ The name of the channel.
  , channelDepth         :: Int
  , channelBackendDepth  :: Int
  , channelInFlightCount :: Int -- ^ Amount of flight messages.
  , channelDeferredCount :: Int -- ^ Amount of deferred messages.
  , channelMessageCount  :: Int -- ^ Amount of messages.
  , channelRequeueCount  :: Int -- ^ Amount of requeued messages.
  , channelTimeoutCount  :: Int
  , channelClients       :: [Client]
  , channelPaused        :: Bool
  } deriving (Show, Generic)

instance ToJSON Channel where
  toJSON = genericToJSON $ aesonPrefix snakeCase

instance FromJSON Channel where
  parseJSON = genericParseJSON $ aesonPrefix snakeCase

data Topic = Topic
  { topicTopicName    :: Text
  , topicChannels     :: [Channel]
  , topicDepth        :: Int
  , topicBackendDepth :: Int
  , topicMessageCount :: Int
  , topicPaused       :: Bool
  } deriving (Show, Generic)

instance ToJSON Topic where
  toJSON = genericToJSON $ aesonPrefix snakeCase

instance FromJSON Topic where
  parseJSON = genericParseJSON $ aesonPrefix snakeCase

data Statistic = Statistic
  { statisticVersion   :: Text
  , statisticHealth    :: Text
  , statisticTopics    :: [Topic]
  , statisticStartTime :: Int
  } deriving (Show, Generic)

instance ToJSON Statistic where
  toJSON = genericToJSON $ aesonPrefix snakeCase

instance FromJSON Statistic where
  parseJSON = genericParseJSON $ aesonPrefix snakeCase

data Report = Report
  { reportTopicName    :: Text
  , reportMessage      :: Int
  , reportTotalMessage :: Int
  , reportConnection   :: Int
  , reportDate         :: Text
  , reportTime         :: Text
  } deriving (Generic, Show)

instance FromRecord Report

instance ToRecord Report

instance ToNamedRecord Report where
  toNamedRecord Report {..} =
      [ "Topic Name" Cassava..= reportTopicName
      , "Message" Cassava..= reportMessage
      , "Total Message" Cassava..= reportTotalMessage
      , "Connection" Cassava..= reportConnection
      , "Date" Cassava..= reportDate
      , "Time" Cassava..= reportTime


As I said before, the main function will be put as processNsq function. This function will hold the flow of the process as general as possible. This is the process flow:

  1. Prepare the nsq statistic url.
  2. Lookup the statistic using previous url and decode it.
  3. Process the decoded statistic.
  4. Send a notification to slack to notify us about the latest statistic.

Based on the process flow above, I will describe each function and its child function (if it exists) based on the code hierarchy. You can find the final processNsq function below:

-- NSQ.hs

-- | Process NSQ `Topic`.
processNsq :: Text -> IO ()
processNsq topicName = do
  nsqStatisticURL <- SE.getEnv "NSQ_STAT_URL"
  let statisticURL topic channel =
          ++ "?format=json&topic="
          ++ topic
          ++ "&channel="
          ++ channel
  -- we hardcode the channel name for now (which is "job"), but we can also
  -- send it together with topic name as an argument
  jsonString <- lookupNsqStatistic $ statisticURL (unpack topicName) "job"
  let statistic = eitherDecode jsonString :: Either String Statistic
  case statistic of
    Left  err  -> putStrLn err
    Right stat -> processTopicStatistic stat topicName

We prepare the nsq statistic url with statisticURL function. After that, we lookup the statistic using lookupNsqStatistic function:

-- NSQ.hs

-- | Lookup `Topic`'s `Statistic`.
lookupNsqStatistic :: String -> IO BSL.ByteString
lookupNsqStatistic url = simpleHttp url

We decode the response using eitherDecode from Aeson and process it using processTopicStatistic if the response is not an error.

-- NSQ.hs

-- | Process `Topic`'s `Statistic` and generate `Report` file.
processTopicStatistic :: Statistic -> Text -> IO ()
processTopicStatistic statistic topicName = do
  topicReports <- breakdownTopicReport (statisticTopics statistic) topicName
  reportToCSV $ L.foldl' (++) [] topicReports

As you can see, we need to break the statistic into our own data type using breakdownTopicReport function before we put it into csv file. breakdownTopicReport will accepts 2 arguments which is an array of Topic and the topic name itself. Based on the topic name, we will iterate the topic names and return a monad as topicReports.

-- NSQ.hs

-- | Map over `Topic` and return list of `Report`s.
breakdownTopicReport :: [Topic] -> Text -> IO [[Report]]
breakdownTopicReport topics topicName
  | topicName == "all" = do
    mapM (iterateTopicName topics) topicNames
  | otherwise = do
    mapM (iterateTopicName topics) [topicName]

The iterateTopicName will populate reports for specified topic name and return it as an array of report.

-- NSQ.hs

-- | Populate `Report` for specified `Topic`.
iterateTopicName :: [Topic] -> Text -> IO [Report]
iterateTopicName topics topicName = do
  let topic    = filterTopicName topics topicName
      channels = case topic of
        Just t  -> topicChannels t :: [Channel]
        Nothing -> []
  reports <- populateReport topic channels
  return reports

-- | Get `Topic` from array of `Topic`.
filterTopicName :: [Topic] -> Text -> Maybe Topic
filterTopicName topics topicName =
  case L.filter (\x -> topicTopicName x == topicName) topics of
    t : _ -> Just t
    _     -> Nothing

-- | Map over the `Channel`s to get the `Report`s.
populateReport :: Maybe Topic -> [Channel] -> IO [Report]
populateReport Nothing      _     = return []
populateReport (Just topic) chans = mapM (populRep topic) chans
  populRep t channel = do
    formattedDate <- getFormattedDate
    formattedTime <- getFormattedTime
    return $ Report (topicTopicName t)
                    (channelDepth channel)
                    (channelMessageCount channel)
                    (L.length $ channelClients channel)

-- | Get date which has been formatted into string.
getFormattedDate :: IO Text
getFormattedDate = do
  currentLocalTime <- getZonedTime
  return $ T.pack $ formatTime defaultTimeLocale "%Y-%m-%d" currentLocalTime

-- | Get time which has been formatted into string.
getFormattedTime :: IO Text
getFormattedTime = do
  currentLocalTime <- getZonedTime
  return $ T.pack $ formatTime defaultTimeLocale "%H:%M" currentLocalTime

Back to processTopicStatistic function, we need to prepare the header of the csv file to include it into our reportToCSV function. We can get the file location from our environment.

-- NSQ.hs

-- | Put `Report`s into csv file.
reportToCSV :: [Report] -> IO ()
reportToCSV reports = do
  csvFile <- SE.getEnv "CSV_FILE"
  BSL.appendFile csvFile $ Cassava.encodeByName reportHeader reports

-- | List of header to be used in `Report`.
reportHeader :: Header
reportHeader = Vector.fromList
  ["Topic Name", "Message", "Total Message", "Connection", "Date", "Time"]

So, we already lookup the statistic from NSQ. Decode and map it into our data type. We have also finished writing it into csv file. The last step is sending a notification to slack to notify us about our new csv file. We will use slack webhook to ease the process. To do this, we need to prepare an object to be sent into our webhook url. Do not forget to encode the object beforehand.

-- NSQ.hs

-- | Send notification to slack channel using web hook.
sendSlackNotification :: IO ()
sendSlackNotification = do
  manager <- newManager tlsManagerSettings
  let requestObject = object
        [ "name" .= ("NSQ Statistic Checker" :: String)
        , "text" .= ("NSQ Statistic Report has been compiled" :: String)

  slackURL       <- SE.getEnv "SLACK_URL"
  initialRequest <- parseRequest slackURL

  let request = initialRequest
        { method         = "POST"
        , requestHeaders = [("Content-Type", "application/json")]
        , requestBody    = RequestBodyLBS $ encode requestObject
  response <- httpLbs request manager
  putStrLn $ responseBody response

How to use it?

I already provide a Make file. So, you can just build the project and execute the run section of the Makefile. You can also copy the command and use the custom argument which we made before to process a specific topic.

Build the project

make build

Run the executable

Run it with make:

make run

Run it with stack:

stack exec nsq-message-checker -- topic optimization_rule

That's all that I can share for now. I hope it will benefit anyone who read it in the future. Thank you very much to my mentor and the reader. Forgive me for the crappy post. :D