fintech

Pulling Stock Market Data with CloudQuery

Tim Armstrong

Tim Armstrong

In my recent presentation at State of Open Con, I wanted to push the envelope and go beyond what we usually do with CloudQuery. So I set my sights on a subject that is about as far away from the normal use cases of CloudQuery as possible - The Delayed Market Data from the London Stock Exchange.
This is that talk in blog format.
This data is fascinating, both because it contains individual transactions that were executed on the London Stock Exchange, and because it is data that they are legally required to provide but don’t want to - so they make it as awkward to retrieve as they can.
As regular readers of this blog will know, CloudQuery is a really powerful and flexible ELT Framework. If you’re a new reader, however, you might be thinking, “Did he just misspell ETL?” - and for that question I’d point you to some of our other content on ETL vs ELT.

Architecture #

Let’s start off with the High-Level Architecture of CloudQuery.
At the highest level, a CloudQuery Source Plugin is a gRPC service that collects data and provides it in an Apache Arrow-based tabular format.
This service and the tables it provides are used by the CloudQuery CLI (or Cloud if you prefer to reduce your DevOps workload) to collect data, which CloudQuery then uses to output via its destination plugins, which are gRPC services that provide a way to convert the tabular data to whatever storage format is needed.
This means that you can write a Source plugin once, and it will automatically work with any destination plugin without the need for any destination-specific code.
In theory, these plugins can be written from scratch to conform to the interface definition, but we strongly suggest plugin developers use one of our SDKs as this abstracts away a lot of the complexity.

SDK Architecture #

So, let’s look at the architecture of a plugin that uses the Python SDK.
In the bottom right block, you’ll see that the SDK takes care of the gRPC interface, which, when called, initializes the Plugin Class and triggers the Scheduler to spawn the processing threads for each of the table resolvers.
In the upper left block, you’ll see the Table Resolvers that are responsible for iterating over the relevant generator from the API Client Class. These generators pull data from the target API, yielding one dictionary object per row. The Table Resolvers then perform any validation as well as any name and type munging needed for each Table row, before yielding the response back to the SDK.
The SDK then handles the conversion from the native dictionary object into the Apache Arrow structure defined in the Table Class, which it streams to the Orchestrator as quickly as it’s available.
This architecture enables plugins to process immense datasets as quickly as possible and without using significant amounts of RAM.

The Data #

Now that we have a high-level understanding of how plugins work, let’s take a look at the data we’re going to use for this project. To keep things simple, we’re only going to look at one of these endpoints for now, but you can find the others implemented in the linked project repo.
So, the first thing we want to do is download a CSV file for the XLON Post Trade data. These files contain every single transaction that occurred on the London Stock Exchange in a given one-minute-long trading window, as such there is one file for every minute the market is open.
This will give us an idea of the specific formatting, the available columns, and suitable datatypes. In an ideal world, we’d be able to get this information from a standards document or API spec, but LSE doesn’t seem to want to provide one for the free-delayed data, so here we are. Another downside of this free data is that it only covers the most recent trading day.
Complaints aside, let’s take a look a the data, here’s an example extract:
distributionTime;sourceVenue;instrumentId;transactionIdentificationCode;mifidPrice;mifidQuantity;tradingDateAndTime;instrumentIdentificationCodeType;instrumentIdentificationCode;priceNotation;priceCurrency;notionalAmount;notionalCurrency;venueOfExecution;publicationDateAndTime;transactionToBeCleared;measurementUnit;quantityInMeasurementUnit;type;mifidFlags
2024-01-30T16:40:05.520572Z;1;72057594038083673;928426642916541;1.23;100000;2024-01-30T16:40:05.519559Z;;VGG4392T1075;;GBX;;;AIMX;2024-01-30T16:40:05.520572Z;0;;;;,,,ALGO,,,
2024-01-30T16:40:05.521181Z;1;72057594038083673;928426642916543;1.23;302;2024-01-30T16:40:05.519559Z;;VGG4392T1075;;GBX;;;AIMX;2024-01-30T16:40:05.521181Z;0;;;;,,,ALGO,,,
2024-01-30T16:40:05.521260Z;1;72057594038083673;928426642916544;1.23;19855;2024-01-30T16:40:05.519559Z;;VGG4392T1075;;GBX;;;AIMX;2024-01-30T16:40:05.521260Z;0;;;;,,,ALGO,,,
Here, we can see that the data is ; separated and uses 5 data types :
  • ISO8601 Datetime (with microsecond/μs accuracy)
  • unsigned int
  • float - technically, the MiFID II / MiFIR specifies prices should be in fixed-point decimal (with 18/13 precision) - but again, this free data doesn’t appear to comply with such standards
  • string
  • CSV separated flags
There’s also a bunch of null fields that we can probably ignore.
If we parse this, transpose it to make it a little more legible, and annotate it with the datatypes, it looks like this:
Field NamesDatatypes123
distributionTimeISO8601 Datetime2024-01-30T16:40:05.520572Z2024-01-30T16:40:05.521181Z2024-01-30T16:40:05.521260Z
sourceVenueUnsigned Int111
instrumentIdUnsigned Int720575940380836737205759403808367372057594038083673
transactionIdentificationCodeUnsigned Int928426642916541928426642916543928426642916544
mifidPriceFloat1.231.231.23
mifidQuantityUnsigned Int10000030219855
tradingDateAndTimeISO8601 Datetime2024-01-30T16:40:05.519559Z2024-01-30T16:40:05.519559Z2024-01-30T16:40:05.519559Z
instrumentIdentificationCodeTypeN/A - Null---
instrumentIdentificationCodeString (ISIN Format)VGG4392T1075VGG4392T1075VGG4392T1075
priceNotationN/A - Null---
priceCurrencyString (ISO4217 Format)GBXGBXGBX
notionalAmountN/A - Null---
notionalCurrencyN/A - Null---
venueOfExecutionStringAIMXAIMXAIMX
publicationDateAndTimeISO8601 Datetime2024-01-30T16:40:05.520572Z2024-01-30T16:40:05.521181Z2024-01-30T16:40:05.521260Z
transactionToBeClearedUnsigned Int000
measurementUnitN/A - Null---
quantityInMeasurementUnitN/A - Null---
typeN/A - Null---
mifidFlagsCSV,,,ALGO,,,,,,ALGO,,,,,,ALGO,,,
From the data, it appears the transactionToBeCleared field is always 0, the sourceVenue field is always 1, and none of the fields marked here as N/A - Null ever get populated.
Dropping these rows leaves us with the following list of fields to collect:
Field NamesDatatypes
distributionTimeISO8601 Datetime
instrumentIdUnsigned Int
transactionIdentificationCodeUnsigned Int
mifidPriceFloat
mifidQuantityUnsigned Int
tradingDateAndTimeISO8601 Datetime
instrumentIdentificationCodeString (ISIN Format)
priceCurrencyString
venueOfExecutionString
publicationDateAndTimeISO8601 Datetime
mifidFlagsCSV

The Table class #

So, looking at the SDK then, how does this translate into a Table class?
In the SDK, the Table class defines the structure of the Table row, which is used to both generate any migrations for the database and to define the Apache Arrow messages that carry the table row data.
If you start with a fresh copy of the Python Plugin Template, you’ll we’ll find a directory called tables, and in there, a module called items.py
In items.py you’ll find an example child of the Table class, which we see has an __init__ definition, containing a name, a human readable title, and a list of columns.
If we use the data types above, clean up the field names to something that makes a little more sense, populate the title and name fields, and rename the class to fit, we end up with something that looks like the following Table class:
class XLONPostDelayed(Table):
    def __init__(self) -> None:
        super().__init__(
            name="xlon_post_delayed",
            title="LSE Post-trade Delayed",
            is_incremental=True,
            columns=[
                Column("distribution_timestamp", pa.timestamp("us", ZoneInfo("UTC"))),
                Column("trading_timestamp", pa.timestamp("us", ZoneInfo("UTC"))),
                Column("transaction_id", pa.uint64(), primary_key=True, unique=True),
                Column("instrument_id", pa.uint64()),
                Column("isin_instrument_code", pa.string()),
                Column("currency", pa.string()),
                Column("price", pa.float64()),
                Column("quantity", pa.uint64()),
            ],
        )

    @property
    def resolver(self):
        return XLONPostDelayedResolver(table=self)
You’ll notice here that I’ve reordered the fields a bit to group similar data together. This is an aesthetic decision that has no practical benefits beyond looking nicer.
It might be a good idea to convert the price back to a Fixed-point Decimal, as this is intended to improve calculation performance and ensure any rounding-based precision errors are in a known range independent of the software and hardware platforms used, but I’ll leave this up to you.

The API Client #

So, we have our table defined; now we need to implement the API client.
Since the London Stock Exchange seems to have decided to make it unreasonably difficult to get a copy of the free data, this one will be, unfortunately, interesting. As with so much of the free data available in the world, we will have to resort to a little web scraping.

Login #

Essentially, we need to scrape the login form to get the CSRF token so that we can login, get a session cookie, and start pulling data.
In an ideal world, they would implement an API key option, but as is the case with so much of the kind of data we want to collect, that’s not available in this case.
So, looking at the login page, we have a simple HTML form, so we’ll want to capture the hidden _csrf field.
To do this, we’ll use requests and beautifulsoup. The process here is fairly straightforward: Create a Session; Use the Session to pull the login page; Use beautifulsoup to parse the login page and retrieve the _csrf value; Post our login credentials and the CSRF token to the login page; and raise an exception if we get an error response.
Again, in the Python Plugin Template we have a framework to build this into; In the directory called example you’ll find a module called client.py that has a class called ExampleClient. If we rename the class to LSEGClient, set up our login fields as arguments, and write the __login method it should look something like this:
class LSEGClient:
    def __init__(
        self, username: str, password: str, base_url="https://dmd.lseg.com/dmd/"
    ):
        self._base_url = base_url
        self._username = username
        self._password = password
        self._session = self.__login()

    def __login(self):
        session = Session()
        login_page = session.get(urljoin(self._base_url, "login.html"))
        login_soup = BeautifulSoup(login_page.content, "html.parser")
        csrf_token = login_soup.form.find("input", type="hidden")["value"]
        result = session.post(
            urljoin(self._base_url, "login.html"),
            data={
                "username": self._username,
                "password": self._password,
                "_csrf": csrf_token,
            },
        )
        result.raise_for_status()
        return session

Files and Generators #

Now that we have our logged-in session, we can write the code to pull the data.
Looking at the path for one of these files reveals a relatively simple pattern. Other than the directory path, it boils down to a table identifier followed by a DateTime.
If we look through these DateTimes, we’ll see that they are at minute intervals for today’s date starting at market-open (08:00) and will ultimately end at market-close (16:30).
Using this knowledge we can avoid additional web scraping and, we can build an iterator like this one:
def xlon_iterator(self) -> Generator[Dict[str, Any], None, None]:
        dir_path = urljoin(self._base_url, "download/posttrade/LSE/FCA/")
        end = datetime.combine(datetime.now().date(), time(16, 30))
        cursor = datetime.combine(end.date(), time(8, 0))
        if cursor.isoweekday() > 5:
            return
        while cursor < datetime.now() and cursor <= end:
            response = self._session.get(
                urljoin(
                    dir_path,
                    f"XLON-post-{cursor.year}-{cursor.month:02}-{cursor.day:02}T{cursor.hour:02}_{cursor.minute:02}.csv",
                )
            )
            response.raise_for_status()
            csv_file = response.content.decode()
            reader = DictReader(csv_file.splitlines(), delimiter=";")
            for row in reader:
                yield row
            cursor += timedelta(minutes=1)
The first thing it does is set up the cursor that’s going to tick through the known link URLs for the current day, returning early if we already know the market will be closed (i.e. it’s Saturday or Sunday). Then it loops over each minute between market-open to market-close pulling the CSV file, which it then decodes and parses, yielding each row to the resolver.

The Resolver #

Now, at this point, we have our API Client, and we have our Table Class, but we haven’t reconciled the difference between our Table class’s column names and CSV files column names, which would lead to some problems if we didn’t correct it. This is where the resolver comes in.
The resolver iterates over the response from the API Client, cleaning, renaming, and validating the data before yielding to the SDK.
By default, the resolver yields exactly what it receives, so if the names are aligned, and you don’t want to check the values are sane, you can skip this step. But since we chose slightly different names, and we don’t trust the quality of our data (given the number of null fields), we have to do something about it.
There are two ways we can tackle this:
  1. Change our Table class’s column names to match exactly the names that are in the API response - which is fine if those make sense and don’t change in the future
  2. Align the names using the resolver, which we might need to do in the future anyway if the API response format changes
Since we should check for data conformity anyway, let’s do the latter.
To do this, the first thing we need to do is remap the names and ensure that our CSV data types are what we expect. Then, we’ll do some sanity checks that skip any potential nonsense rows.
class XLONPostDelayedResolver(TableResolver):
    def __init__(self, table) -> None:
        super().__init__(table=table)

    def resolve(
        self, client: Client, parent_resource: Resource
    ) -> Generator[Any, None, None]:
        for item_response in client.client.xlon_iterator():
            cleaned_row = {
                "distribution_timestamp": datetime.fromisoformat(item_response.get("distributionTime")),
                "trading_timestamp": datetime.fromisoformat(item_response.get("tradingDateAndTime")),
                "transaction_id": int(item_response.get("transactionIdentificationCode")),
                "instrument_id": int(item_response.get("instrumentId")),
                "isin_instrument_code": item_response.get("instrumentIdentificationCode"),
                "currency": item_response.get("priceCurrency"),
                "price": float(item_response.get("mifidPrice")),
                "quantity": int(item_response.get("mifidQuantity")),
            }
            if cleaned_row["trading_timestamp"] > datetime.now(tz=ZoneInfo("UTC")):
                continue  # trade has impossible datetime (it happened in the future)
            if cleaned_row["isin_instrument_code"] is None:
                continue  # trade must have an instrument code
            if cleaned_row["currency"] is None:
                continue  # trade must have a currency
            if cleaned_row["quantity"] < 0:
                continue  # trade must have a positive quantity
            yield cleaned_row

The Config Spec & Plugin Definition #

With that all sorted, we just need a way to provide the credentials to the API Client, to do this we need to adjust our ColoudQuery plugin’s config Spec and Plugin interface.
In the Python Plugin Template, you’ll find a directory called client in which you’ll find a module called client.py, in which you’ll find the Spec - a Dataclass that represents the fields we expect from our CloudQuery Config file.
In this case, we’ll replace access_token with username and password, and we’ll adjust the base_url field’s default value. You can also add some validation to ensure that the username and password fields are populated.
This should look something like:
@dataclass
class Spec:
    username: str
    password: str
    base_url: str = field(default="https://dmd.lseg.com/dmd/")
    concurrency: int = field(default=DEFAULT_CONCURRENCY)
    queue_size: int = field(default=DEFAULT_QUEUE_SIZE)

    def validate(self):
        if self.username is None:
            raise Exception("username must be provided")
        if self.password is None:
            raise Exception("password must be provided")
Finally, we need to tell the SDK how to represent our plugin to CloudQuery.
In the Python Plugin Template, you’ll find a module called plugin.py here, you’ll find a few constants (PLUGIN_NAME, PLUGIN_VERSION, and TEAM_NAME) and a class called ExampleClient. You’ll need to update these values appropriately to suit your plugins.
Additionally, in the get_tables function of this class, you’ll find a list called all_tables. This is where you list your instances of the Table class - in this case, that’s just tables.XLONPostDelayed(), but if you implement more than one table, you must also add these here.
Note: If you’re working with tables that have relations (i.e., a table is the parent of one or more child tables), you only need to list the parent tables here. But this is out of the scope of this tutorial, so expect a blog post dedicated to relations in the near future.
PLUGIN_NAME = "lseg"
PLUGIN_VERSION = "0.0.1"
TEAM_NAME = "cloudquery"
PLUGIN_KIND = "source"

Running CloudQuery #

At this point, this single table version of our plugin is complete. We’ve implemented everything we need to pull the XLON Post data. So, how do we do that?
In the Python Plugin Template, you’ll find a file called TestConfig.yaml.
We need to change the name of the Source plugin from comics to lseg (matching the name defined in instance of the Plugin class). Then, we need to add the username and password fields, and since the base_url field has a default value, we can comment that out or even delete it.
Now that’s done, and assuming you have CloudQuery installed, you just need to run
cloudquery sync TestConfig.yaml.
Now, this is quite a lot of data that it’s going to pull, and it takes around 30 minutes to sync a full day’s worth of trading table data from the London Stock Exchange - on a good connection. But, if you started it after 16:45 London/UK time (taking daylight savings into account) when it’s done, you’ll have a complete copy of all the published raw trading data for that day.
What’s even more remarkable is that while this setup loads the data into an SQLite DB, changing it to use any destination plugin, you can find on hub.CloudQuery.io is as simple as updating the destination section of the TestConfig.yml file - no further work needed.
For the completed code from this post, check out the one_table branch
Start your free trial today

Experience Simple, Fast and Extensible Data Movement.