最新消息:Welcome to the puzzle paradise for programmers! Here, a well-designed puzzle awaits you. From code logic puzzles to algorithmic challenges, each level is closely centered on the programmer's expertise and skills. Whether you're a novice programmer or an experienced tech guru, you'll find your own challenges on this site. In the process of solving puzzles, you can not only exercise your thinking skills, but also deepen your understanding and application of programming knowledge. Come to start this puzzle journey full of wisdom and challenges, with many programmers to compete with each other and show your programming wisdom! Translated with DeepL.com (free version)

python - Apache beam pipeline not running to completion after applying windowing and a trigger and no errors thrown - Stack Over

matteradmin5PV0评论

Hello am in process of creating an apache beam data pipiline that runs in GCP using DataFlow as runner.Below is a my code ,it does not throw any errors but the issue is no data is written into bigquery when I check the Job execution graph there seems to be no activity on the "write to bigquery" section of the pipeline yet I can see activities in data commming from the pubsub activity.I have gone even ahead to add a trigger but still no data is coming out .Please assit .Thanks

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.dataframe.convert import to_dataframe, to_pcollection
from apache_beam import window
import typing
import numpy as np
import pandas as pd


class BmsSchema(typing.NamedTuple):
  can_data_frame_1: typing.Optional[str]


beam.coders.registry.register_coder(BmsSchema, beam.coders.RowCoder)


class ParsePubSubMessage(beam.DoFn):
  def process(self, message):
      import json
      all_columns = [
          "can_data_frame_1"
      ]
      main_dict = dict(zip(all_columns, [None] * len(all_columns)))
      record = json.loads(message.decode('utf-8'))
      main_dict.update(record)
      yield {
          all_columns[0]: main_dict[all_columns[0]]}


def run():
  options = PipelineOptions(
      project='dwingestion',
      runner='DataflowRunner',
      streaming=True,
      temp_location='gs://....../temp',
      staging_location='gs://.........../staging',
      region='europe-west1',
      job_name='.........streaming-pipeline-dataflow',
      save_main_session=True,
      flags=['--allow_unsafe_triggers']
  )

  options.view_as(StandardOptions).streaming = True

  input_subscription = 'projects/..._data_streaming'

  table_schema = {
      "fields": [
          {"name": "current_mA", "type": "INTEGER", "mode": "NULLABLE"}
      ]
  }

  with beam.Pipeline(options=options) as p:
      messages = (p
                  | 'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=input_subscription)
                  | 'Apply Fixed Window' >> beam.WindowInto(
                      window.FixedWindows(60),
                      trigger=beam.trigger.AfterWatermark(),
                      allowed_lateness=window.Duration(10),
                      accumulation_mode=beam.trigger.AccumulationMode.DISCARDING
                  )
                  | 'Parse PubSub Message' >> beam.ParDo(ParsePubSubMessage())
                  | 'Attaching the schema' >> beam.Map(lambda x: BmsSchema(**x)).with_output_types(BmsSchema)
                  )

      # Convert the messages to a DataFrame
      df = to_dataframe(messages)
      

      # Extract and process the 'current_mA' field
      df['current_mA'] = df['can_data_frame_1'].str[4:8].apply(lambda x: int(x, 16) if pd.notna(x) else 0)
      df['current_mA'] = df['current_mA'].where(df['current_mA'] < 0x8000, df['current_mA'] - 0x10000)
      df['current_mA'] = df['current_mA'] * 10

      
      # Convert back to PCollection and map to dictionaries
      transformed_pcol = (
          to_pcollection(df)
          | 'Log Transformed PCollection' >> beam.Map(lambda x: (print(f"Transformed Row: {x}"), x)[1])  # Debugging
          | 'Convert to Dict with Native Types' >> beam.Map(lambda row: {
              "current_mA": int(row.current_mA) if row.current_mA is not None else None
          })
      )

      # Write to BigQuery
      transformed_pcol | 'Write to BigQuery' >> WriteToBigQuery(
          table='..........table_test_all_columns_04',
          write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
          create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
          schema=table_schema,
          custom_gcs_temp_location='gs://......_template/temp'
      )


if __name__ == '__main__':
  run()


Articles related to this article

Post a comment

comment list (0)

  1. No comments so far